package main

import (
	"context"
	"encoding/json"
	"gitee.com/Cookie_XiaoD/easykafka"
	"gitee.com/Cookie_XiaoD/easykafka/spec"
	"log"
	"time"
)

var consumer spec.Consumer
var msgs = make(chan spec.Msg, 10000)
var brokers = "127.0.0.1:9092"

func main() {
	startConsumer()
	var batch []spec.Msg
	//模拟每收到5条数据就进行一次处理，处理完成后批量提交
	for {
		msg := <-msgs
		batch = append(batch, msg)
		if len(batch) != 5 {
			log.Println("接收到", len(batch), "条数据")
			continue
		}
		log.Println("接收到5条数据，开始处理")
		for _, v := range batch {
			var data ExampleData
			err := json.Unmarshal(v.Data(), &data)
			if err != nil {
				continue
			}
			log.Println("处理数据：", data)
		}
		log.Println("处理完成开始批量提交")
		err := consumer.ConfirmBatch(batch)
		if err != nil {
			log.Println("批量提交失败:", err)
		} else {
			log.Println("批量提交成功")
		}
		batch = batch[0:0]
		time.Sleep(1 * time.Second)
	}
}

func startConsumer() {
	go func() {
		var err error
		consumer, err = easykafka.NewConsumer(
			brokers,
			[]string{"topic_example"},
			"group_example",
			handleMsg,
			easykafka.WithConsumerErrorHandler(handleErr),
			easykafka.WithConsumerAOR(spec.Earliest),
			easykafka.WithConsumerManualCommit(true))
		if err != nil {
			log.Fatalf(err.Error())
		}
		defer func() {
			if err := consumer.Close(); err != nil {
				log.Println("关闭消费者发生错误：", err)
			}
		}()
		log.Println("开始接收数据")
		consumer.StartBlock(context.Background())
	}()
}

func handleMsg(msg spec.Msg) {
	log.Println("接收到数据", msg.Topic(), msg.Partition(), msg.Offset())
	msgs <- msg
}

func handleErr(err *easykafka.ConsumeError) {
	log.Println("发生错误：", err)
}

type ExampleData struct {
	Content string `json:"content"`
	Seq     string `json:"seq"`
}
